Send WS messages in parallel and reuse serialized data#1425
Send WS messages in parallel and reuse serialized data#1425karashiiro merged 1 commit intoUniversalis-FFXIV:v2from
Conversation
📝 WalkthroughWalkthroughThis PR introduces message serialization caching and parallel message dispatch optimization. A caching mechanism stores pre-serialized bytes on SocketMessage instances for reuse. The message dispatch in SocketProcessor shifts from synchronous iteration to Parallel.ForEach with per-connection error handling. SocketClient integrates the cached serialization directly. Comprehensive tests validate caching behavior and parallel dispatch resilience. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~23 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/Universalis.Application/Realtime/SocketProcessor.cs (1)
43-49: Consider adding a writer flush before reading the stream.The
BsonBinaryWritermay buffer data internally. While MongoDB's implementation typically flushes on dispose, explicitly flushing beforeToArray()would be more defensive.🔎 Proposed fix
private static byte[] SerializeMessage(SocketMessage message) { using var stream = MemoryStreamPool.GetStream(); using var writer = new BsonBinaryWriter(stream); BsonSerializer.Serialize(writer, message.GetType(), message); + writer.Flush(); return stream.ToArray(); }src/Universalis.Application/Realtime/Messages/SocketMessage.cs (1)
28-37: Implementation is correct; consider consistency with SocketProcessor.SerializeMessage.The on-the-fly serialization path here mirrors
SocketProcessor.SerializeMessage. Both could benefit from an explicitwriter.Flush()beforeToArray()for defensive coding.Also, there's minor duplication between this fallback path and
SerializeMessageinSocketProcessor. If this becomes a maintenance concern, consider extracting a shared serialization helper.🔎 Proposed fix for flush
internal ReadOnlyMemory<byte> GetSerializedBytes(RecyclableMemoryStreamManager pool) { if (CachedSerializedBytes != null) return CachedSerializedBytes; using var stream = pool.GetStream(); using var writer = new BsonBinaryWriter(stream); BsonSerializer.Serialize(writer, GetType(), this); + writer.Flush(); return stream.ToArray(); }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cssrc/Universalis.Application.Tests/Realtime/SocketProcessorTests.cssrc/Universalis.Application/Realtime/Messages/SocketMessage.cssrc/Universalis.Application/Realtime/SocketClient.cssrc/Universalis.Application/Realtime/SocketProcessor.cssrc/Universalis.Application/Universalis.Application.csproj
🧰 Additional context used
🧬 Code graph analysis (3)
src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs (3)
src/Universalis.Application/Realtime/SocketProcessor.cs (2)
SocketProcessor(18-95)Publish(51-77)src/Universalis.Application/Realtime/ISocketClient.cs (1)
Push(14-14)src/Universalis.Application/Realtime/Messages/SocketMessage.cs (2)
SocketMessage(9-38)SocketMessage(20-23)
src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs (1)
src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs (5)
Fact(20-35)Fact(37-72)Fact(74-98)Fact(100-128)Fact(130-156)
src/Universalis.Application/Realtime/SocketProcessor.cs (2)
src/Universalis.Application/Realtime/Messages/SocketMessage.cs (2)
SocketMessage(9-38)SocketMessage(20-23)src/Universalis.Application/Realtime/ISocketClient.cs (1)
Push(14-14)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Codacy Static Code Analysis
- GitHub Check: Analyze (csharp)
🔇 Additional comments (7)
src/Universalis.Application/Realtime/SocketClient.cs (1)
269-273: LGTM! Clean simplification of the send logic.The refactored
SendEventnow correctly leverages the newGetSerializedBytesAPI which handles both cached (fromSocketProcessor.Publish) and on-the-fly serialization (for per-client messages likeSubscribeFailure). The singleSendAsynccall withEndOfMessageis cleaner than the previous streaming loop.src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs (2)
29-41: Good test coverage for serialization paths.The tests comprehensively cover the caching mechanism. Minor note: Lines 39-40 have slightly redundant assertions (
Assert.False(result.IsEmpty)followed byAssert.True(result.Length > 0)), but this doesn't affect correctness.
74-87: Good coverage for the fallback serialization path.This test correctly validates that
SubscribeFailuremessages (sent per-client without pre-caching) serialize properly via the on-the-fly path.src/Universalis.Application/Realtime/SocketProcessor.cs (1)
51-77: Solid parallel dispatch implementation with proper error isolation.The approach correctly:
- Serializes once before dispatch (line 56)
- Uses
Parallel.ForEachwith bounded parallelism- Isolates per-connection failures with try/catch
- Tracks exceptions via counter
One observation:
Publishmutates the incomingmessageby settingCachedSerializedBytes. This is fine for the current usage pattern but could be surprising if a caller ever reuses a message object across multiplePublishcalls with different content expectations.src/Universalis.Application/Universalis.Application.csproj (1)
8-10: LGTM! Standard pattern for test access to internals.Exposing internals to the test assembly is the correct approach for testing internal caching behavior without polluting the public API.
src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs (2)
20-35: Good test for caching side effect.This test correctly verifies that
PublishpopulatesCachedSerializedByteson the message, which is the core optimization of this PR.
37-72: Important test for error resilience in parallel dispatch.This test validates that a failing client doesn't prevent other clients from receiving messages—a critical property for the
Parallel.ForEachimplementation. The verification thatbadClient.Pushwas still called (line 71) confirms the exception was thrown and handled gracefully.
Parallel.ForEachto send WS messages in parallel instead of allowing them to fan out linearly with the number of connected clientsSummary by CodeRabbit
Tests
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.